{
  "cells": [
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "view-in-colab"
      },
      "source": [
        "<a href=\"https://colab.research.google.com/github/pathwaycom/pathway/blob/main/examples/projects/from_jupyter_to_deploy/part3_kafka_and_alerts.ipynb\" target=\"_parent\"><img src=\"https://pathway.com/assets/colab-badge.svg\" alt=\"Run In Colab\" class=\"inline\"/></a>"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "notebook-instructions"
      },
      "source": [
        "# Installing Pathway with Python 3.10+\n",
        "\n",
        "In the cell below, we install Pathway into a Python 3.10+ Linux runtime.\n",
        "\n",
        "> **If you are running in Google Colab, please run the colab notebook (Ctrl+F9)**, disregarding the 'not authored by Google' warning.\n",
        "> \n",
        "> **The installation and loading time is less than 1 minute**.\n"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {
        "cellView": "form",
        "id": "pip-installation-pathway"
      },
      "outputs": [],
      "source": [
        "%%capture --no-display\n",
        "!pip install --prefer-binary pathway"
      ]
    },
    {
      "cell_type": "markdown",
      "id": "1",
      "metadata": {},
      "source": [
        "# Part 3: Kafka integration and alerts forwarding (Consumer)\n",
        "This notebook is part of the third part of the tutorial [From interactive data exploration to deployment](/developers/user-guide/exploring-pathway/from-jupyter-to-deploy#part-3-kafka-integration-and-alerts-forwarding)."
      ]
    },
    {
      "cell_type": "markdown",
      "id": "2",
      "metadata": {},
      "source": [
        "## Reading messages from Kafka"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "id": "3",
      "metadata": {},
      "outputs": [],
      "source": [
        "import datetime\n",
        "\n",
        "import pathway as pw\n",
        "\n",
        "# To use advanced features with Pathway Scale, get your free license key from\n",
        "# https://pathway.com/features and paste it below.\n",
        "# To use Pathway Community, comment out the line below.\n",
        "pw.set_license_key(\"demo-license-key-with-telemetry\")\n",
        "\n",
        "# TODO Please set appropriate values for KAFKA_ENDPOINT, KAFKA_USERNAME, and KAFKA_PASSWORD\n",
        "rdkafka_consumer_settings = {\n",
        "    \"bootstrap.servers\": \"KAFKA_ENDPOINT:9092\",\n",
        "    \"security.protocol\": \"sasl_ssl\",\n",
        "    \"sasl.mechanism\": \"SCRAM-SHA-256\",\n",
        "    \"sasl.username\": \"KAFKA_USERNAME\",\n",
        "    \"sasl.password\": \"KAFKA_PASSWORD\",\n",
        "    \"group.id\": \"kafka-group-0\",\n",
        "    \"auto.offset.reset\": \"earliest\",\n",
        "}\n",
        "\n",
        "\n",
        "# The schema definition is autogenerated\n",
        "class DataSchema(pw.Schema):\n",
        "    ticker: str\n",
        "    open: float\n",
        "    high: float\n",
        "    low: float\n",
        "    close: float\n",
        "    volume: float\n",
        "    vwap: float\n",
        "    t: int\n",
        "    transactions: int\n",
        "    otc: str\n",
        "\n",
        "\n",
        "data = pw.io.kafka.read(\n",
        "    rdkafka_consumer_settings, topic=\"ticker\", format=\"json\", schema=DataSchema\n",
        ")"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "id": "4",
      "metadata": {},
      "outputs": [],
      "source": [
        "data = data.with_columns(t=data.t.dt.utc_from_timestamp(unit=\"ms\"))"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "id": "5",
      "metadata": {},
      "outputs": [],
      "source": [
        "minute_20_stats = (\n",
        "    data.windowby(\n",
        "        pw.this.t,\n",
        "        window=pw.temporal.sliding(\n",
        "            hop=datetime.timedelta(minutes=1), duration=datetime.timedelta(minutes=20)\n",
        "        ),\n",
        "        behavior=pw.temporal.exactly_once_behavior(),\n",
        "        instance=pw.this.ticker,\n",
        "    )\n",
        "    .reduce(\n",
        "        ticker=pw.this._pw_instance,\n",
        "        t=pw.this._pw_window_end,\n",
        "        volume=pw.reducers.sum(pw.this.volume),\n",
        "        transact_total=pw.reducers.sum(pw.this.volume * pw.this.vwap),\n",
        "        transact_total2=pw.reducers.sum(pw.this.volume * pw.this.vwap**2),\n",
        "    )\n",
        "    .with_columns(vwap=pw.this.transact_total / pw.this.volume)\n",
        "    .with_columns(\n",
        "        vwstd=(pw.this.transact_total2 / pw.this.volume - pw.this.vwap**2) ** 0.5\n",
        "    )\n",
        "    .with_columns(\n",
        "        bollinger_upper=pw.this.vwap + 2 * pw.this.vwstd,\n",
        "        bollinger_lower=pw.this.vwap - 2 * pw.this.vwstd,\n",
        "    )\n",
        ")"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "id": "6",
      "metadata": {},
      "outputs": [],
      "source": [
        "minute_1_stats = (\n",
        "    data.windowby(\n",
        "        pw.this.t,\n",
        "        window=pw.temporal.tumbling(datetime.timedelta(minutes=1)),\n",
        "        behavior=pw.temporal.exactly_once_behavior(),\n",
        "        instance=pw.this.ticker,\n",
        "    )\n",
        "    .reduce(\n",
        "        ticker=pw.this._pw_instance,\n",
        "        t=pw.this._pw_window_end,\n",
        "        volume=pw.reducers.sum(pw.this.volume),\n",
        "        transact_total=pw.reducers.sum(pw.this.volume * pw.this.vwap),\n",
        "    )\n",
        "    .with_columns(vwap=pw.this.transact_total / pw.this.volume)\n",
        ")"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "id": "7",
      "metadata": {},
      "outputs": [],
      "source": [
        "joint_stats = (\n",
        "    minute_1_stats.join(\n",
        "        minute_20_stats, pw.left.t == pw.right.t, pw.left.ticker == pw.right.ticker\n",
        "    )\n",
        "    .select(\n",
        "        *pw.left,\n",
        "        bollinger_lower=pw.right.bollinger_lower,\n",
        "        bollinger_upper=pw.right.bollinger_upper,\n",
        "    )\n",
        "    .with_columns(\n",
        "        is_alert=(\n",
        "            (pw.this.volume > 10000)\n",
        "            & (\n",
        "                (pw.this.vwap > pw.this.bollinger_upper)\n",
        "                | (pw.this.vwap < pw.this.bollinger_lower)\n",
        "            )\n",
        "        )\n",
        "    )\n",
        "    .with_columns(\n",
        "        action=pw.if_else(\n",
        "            pw.this.is_alert,\n",
        "            pw.if_else(pw.this.vwap > pw.this.bollinger_upper, \"sell\", \"buy\"),\n",
        "            \"hodl\",\n",
        "        )\n",
        "    )\n",
        ")"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "id": "8",
      "metadata": {},
      "outputs": [],
      "source": [
        "alerts = joint_stats.filter(pw.this.is_alert).select(\n",
        "    pw.this.ticker, pw.this.t, pw.this.vwap, pw.this.action\n",
        ")"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "id": "9",
      "metadata": {},
      "outputs": [],
      "source": [
        "import bokeh.models\n",
        "\n",
        "\n",
        "def stats_plotter(src):\n",
        "    actions = [\"buy\", \"sell\", \"hodl\"]\n",
        "    color_map = bokeh.models.CategoricalColorMapper(\n",
        "        factors=actions, palette=(\"#00ff00\", \"#ff0000\", \"#00000000\")\n",
        "    )\n",
        "\n",
        "    fig = bokeh.plotting.figure(\n",
        "        height=400,\n",
        "        width=600,\n",
        "        title=\"20 minutes Bollinger bands with last 1 minute average\",\n",
        "        x_axis_type=\"datetime\",\n",
        "    )\n",
        "\n",
        "    fig.line(\"t\", \"vwap\", source=src)\n",
        "\n",
        "    fig.line(\"t\", \"bollinger_lower\", source=src, line_alpha=0.3)\n",
        "    fig.line(\"t\", \"bollinger_upper\", source=src, line_alpha=0.3)\n",
        "    fig.varea(\n",
        "        x=\"t\",\n",
        "        y1=\"bollinger_lower\",\n",
        "        y2=\"bollinger_upper\",\n",
        "        fill_alpha=0.3,\n",
        "        fill_color=\"gray\",\n",
        "        source=src,\n",
        "    )\n",
        "\n",
        "    fig.scatter(\n",
        "        \"t\",\n",
        "        \"vwap\",\n",
        "        size=10,\n",
        "        marker=\"circle\",\n",
        "        color={\"field\": \"action\", \"transform\": color_map},\n",
        "        source=src,\n",
        "    )\n",
        "\n",
        "    return fig"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "id": "10",
      "metadata": {},
      "outputs": [],
      "source": [
        "import panel as pn\n",
        "\n",
        "viz = pn.Row(\n",
        "    joint_stats.plot(stats_plotter, sorting_col=\"t\"),\n",
        "    alerts.show(include_id=False, sorters=[{\"field\": \"t\", \"dir\": \"desc\"}]),\n",
        ")\n",
        "viz"
      ]
    },
    {
      "cell_type": "markdown",
      "id": "11",
      "metadata": {},
      "source": [
        "## Alerts forwarding to Slack"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "id": "12",
      "metadata": {},
      "outputs": [],
      "source": [
        "import requests\n",
        "\n",
        "# TODO Please set appropriate values for SLACK_CHANNEL_ID, and SLACK_TOKEN\n",
        "slack_alert_channel_id = \"SLACK_CHANNEL_ID\"\n",
        "slack_alert_token = \"SLACK_TOKEN\"\n",
        "\n",
        "\n",
        "def send_slack_alert(key, row, time, is_addition):\n",
        "    if not is_addition:\n",
        "        return\n",
        "    alert_message = f'Please {row[\"action\"]} {row[\"ticker\"]}'\n",
        "    print(f'Sending alert \"{alert_message}\"')\n",
        "    requests.post(\n",
        "        \"https://slack.com/api/chat.postMessage\",\n",
        "        data=\"text={}&channel={}\".format(alert_message, slack_alert_channel_id),\n",
        "        headers={\n",
        "            \"Authorization\": \"Bearer {}\".format(slack_alert_token),\n",
        "            \"Content-Type\": \"application/x-www-form-urlencoded\",\n",
        "        },\n",
        "    ).raise_for_status()\n",
        "\n",
        "\n",
        "pw.io.subscribe(alerts, send_slack_alert)"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "id": "13",
      "metadata": {},
      "outputs": [],
      "source": [
        "pw.run()"
      ]
    }
  ],
  "metadata": {
    "kernelspec": {
      "display_name": "Python 3 (ipykernel)",
      "language": "python",
      "name": "python3"
    },
    "language_info": {
      "codemirror_mode": {
        "name": "ipython",
        "version": 3
      },
      "file_extension": ".py",
      "mimetype": "text/x-python",
      "name": "python",
      "nbconvert_exporter": "python",
      "pygments_lexer": "ipython3",
      "version": "3.11.9"
    }
  },
  "nbformat": 4,
  "nbformat_minor": 5
}
